package realtime.app.dwd.db;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import realtime.util.MyKafkaUtil;

import java.time.ZoneId;

public class Dwd_Trade_Payment_suc {
    public static void main(String[] args) {
        // TODO 1. 基本环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        env.setParallelism(4);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.getConfig().setLocalTimeZone(ZoneId.of("GMT+8"));

        // TODO 2. 状态后端设置
//        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
//        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
//        env.getCheckpointConfig().enableExternalizedCheckpoints(
//                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
//        );
//        env.setRestartStrategy(RestartStrategies.failureRateRestart(
//                3, Time.days(1), Time.minutes(1)
//        ));
//        env.setStateBackend(new HashMapStateBackend());
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/flinkcdc/220926");
//        System.setProperty("HADOOP_USER_NAME", "atguigu");


        // TODO 3. 从 Kafka 读取业务数据，封装为 Flink SQL 表
        tableEnv.executeSql("create table topic_db(" +
                "`database` String,\n" +
                "`table` String,\n" +
                "`type` String,\n" +
                "`data` map<String, String>,\n" +
                "`old` map<String, String>,\n" +
                "`proc_time` as PROCTIME(),\n" +
                "`ts` string\n" +
                ")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_trade_payment_suc_edu"));
//        tableEnv.sqlQuery("select data['user_id'] from topic_db").execute().print();

        //  TODO 4. 筛选支付成功数据
        Table paymentInfo = tableEnv.sqlQuery("select\n" +
                        "data['order_id'] order_id,\n" +
                        "data['payment_type'] payment_type,\n" +
                        "data['callback_time'] callback_time,\n" +
                        "ts\n" +
                        "from topic_db\n" +
                        "where `table` = 'payment_info'\n"
                +
                "and `type` = 'insert'\n" +
//                "and data['user_id'] is not null\n" +
                "and data['payment_status']='1602'"
        );
        tableEnv.createTemporaryView("payment_info", paymentInfo);

//        tableEnv.sqlQuery("select * from payment_info").execute().print();


        // TODO 6. 创建 Kafka dwd_trade_pay_detail_suc 表
        tableEnv.executeSql("create table dwd_trade_payment_suc(\n" +
                "order_id string,\n" +
                "payment_type string,\n" +
                "callback_time string,\n" +
                "ts string,\n" +
                "primary key(order_id) not enforced\n" +
                ")" + MyKafkaUtil.getUpsertKafkaDDL("dwd_trade_payment_detail_suc"));


        // TODO 6. 将关联结果写入 Upsert-Kafka 表
        tableEnv.executeSql("" +
                "insert into dwd_trade_payment_suc select * from payment_info");

    }
}
